package gu.simplemq.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;

import gu.simplemq.BaseMQProducer;
import gu.simplemq.Constant;
import gu.simplemq.IAdvisor;
import gu.simplemq.IProducer;
import gu.simplemq.MQConnectionException;
import gu.simplemq.MQRuntimeException;
import gu.simplemq.ZeroAdvisor;

/**
 * 
 * {@link IProducer} MQTT实现
 * @author guyadong
 *
 */
public class MqttProducer extends BaseMQProducer<MqttClient> implements Constant{
	private final int qos;
	private final IAdvisor adivsor = new ZeroAdvisor();
	public MqttProducer(MqttPoolLazy poolLazy) {
		super(poolLazy);
		this.qos = poolLazy.getQos();
	}
	@Override
	protected void doSend(MqttClient c, String channel, Iterable<String> messages) throws Exception {
        for(String message : messages){
	        // 发布消息
        	try {
        		c.publish(channel, message.getBytes(UTF_8), qos, false);
			} catch (MqttException e) {
				if(e.getReasonCode() == MqttException.REASON_CODE_CLIENT_NOT_CONNECTED){
					throw new MQConnectionException(e);
				}
				throw new MQRuntimeException(e);
			} catch (Throwable e) {
				throw new MQRuntimeException(e);
			}
        }
	}
	@Override
	public IAdvisor getAdvisor() {
		return adivsor;
	}

}
